/*
 *  Copyright (C) 2004 Cidero, Inc.
 *
 *  Permission is hereby granted to any person obtaining a copy of 
 *  this software to use, copy, modify, merge, publish, and distribute
 *  the software for any non-commercial purpose, subject to the
 *  following conditions:
 *  
 *  The above copyright notice and this permission notice shall be included
 *  in all copies or substantial portions of the Software.
 *
 *  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS 
 *  OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 *  FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL 
 *  THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 *  LIABILITY IN CONNECTION WITH THE SOFTWARE.
 * 
 *  File: $RCSfile: HTTPProxySession.java,v $
 *
 */
package com.cidero.server;

import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.MalformedURLException;
import java.net.URL;
//import java.net.URLConnection;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Vector;
import java.util.Collections;
import java.util.logging.Logger;
import java.text.NumberFormat;

import org.cybergarage.xml.XML;

import com.cidero.upnp.*;
import com.cidero.http.*;
import com.cidero.util.ShoutcastOutputStream;
import com.cidero.util.ByteBufferQueue;
import com.cidero.util.NetUtil;
import com.cidero.util.AppPreferences;

/**
 *  Class to hold information associated with an HTTP Proxy 'session'.
 *  The proxy session is used to service multiple clients from a single
 *  HTTP streaming source. So multiple players in a home connected to 
 *  the same Internet radio stream will only have one connection to the
 *  'real' source.  
 */
public class HTTPProxySession implements Runnable
{
  private static Logger logger = Logger.getLogger("com.cidero.server");

  public  static int QUEUE_PACKET_SIZE = 4096;
  public  static int RENDERER_TIMEOUT_MILLISEC = 12000;

  // URL of resource that this session is handling
  String    resourceURL;      

  // HTTP user agent for session - used to differentiate device types
  // (only devices with same userAgent value are allowed to attempt to sync)
  String    userAgent;

  // Queue for data transfer between input/output HTTP connections
  ByteBufferQueue  queue;   

  // Group of client connections for this session
  SyncShoutcastGroup syncShoutcastGroup = null;
  
  // Buffer for incoming shoutcast metadata
  byte[] metadataBuf = new byte[4096];

  long  createTimeMillis = 0;
  
  // Sometimes its handy to throttle read rate (certain net radio situations)
  int  readThrottleBytesPerSec = 32768;

  int syncWaitMillisec;

  // Default is to always request shoutcast data. It only gets inserted
  // in proxy output stream if clients have requested it
  boolean icyMetadataRequestFlag = true; 

  /**
   * Construct an HTTP Proxy session instance
   *                
   * @param url       URL of resource for this session. This can be
   *                  the name of a single media item, or the name
   *                  of a playlist (.m3u, .pls (TODO) ). 
   * 
   * @param syncWaitMillisec  
   *        Number of milliseconds to wait for multiple devices to join 
   *        synchronized device group.  0 to disable sync
   * 
   * @throws UPnPException if there is a problem with the URL or the
   *         metadata
   *         
   */
  public HTTPProxySession( String resourceURL,
                           String userAgent,
                           int syncWaitMillisec )
    throws UPnPException
  {
    this.resourceURL = resourceURL;
    this.userAgent = userAgent;
    this.syncWaitMillisec = syncWaitMillisec;
    
    //
    // 128-bit audio is 16 Kbytes/sec. Allow for 4 secs worth of 
    // buffering (64 k). This buffer size is not critical, since 
    // it is *extra* buffering on top of the buffering done in the
    // absence of the proxy
    //
    queue = new ByteBufferQueue( QUEUE_PACKET_SIZE, 16 );

    createTimeMillis = System.currentTimeMillis();
  }

  /**
   * TODO: Need to clean up and consolidate this a bit
   */
  public void close() 
  {
    if( syncShoutcastGroup != null )
      syncShoutcastGroup.close();  // Exit sketch mode for Soundbridges
  }
  
  public String getResourceURL() 
  {
    return resourceURL;
  }

  public String getUserAgent() 
  {
    return userAgent;
  }

  public int getSyncWaitMillisec() 
  {
    return syncWaitMillisec;
  }

  public ByteBufferQueue getQueue() 
  {
    return queue;
  }

  public long getCreateTimeMillis() 
  {
    return createTimeMillis;
  }

  public void setIcyMetadataRequestFlag( boolean flag ) 
  {
    icyMetadataRequestFlag = flag;
  }
  
  /**
   *  Join the SynchronizedShoutcastGroup associated with this session.
   *  If no group is active, create one
   *
   *  @return  Reference to new SyncShoutcastGroup, or null if joined 
   *           existing group
   */ 
  public synchronized SyncShoutcastGroup 
                      joinSyncShoutcastGroup( ShoutcastOutputStream outStream,
                                              HTTPConnection connection )
  {
    if( syncShoutcastGroup == null )  // First requester for this URL ?
    {
      boolean enableSoundbridgeMetadata = false;
      String userAgent = 
        connection.getRequest().getHeaderValue(HTTP.USER_AGENT);
      //System.out.println("join - userAgent = " + userAgent );

      AppPreferences pref = RadioServer.getPreferences();

      if( (userAgent != null) && 
          (userAgent.toLowerCase().indexOf("roku") >= 0 ) &&
          pref.getBoolean("soundBridgeSketchDisplayEnabled", false) )
      {
        enableSoundbridgeMetadata = true;
      }
          
      syncShoutcastGroup = new SyncShoutcastGroup( this, outStream,
                                                   connection,
                                                   enableSoundbridgeMetadata );
      return syncShoutcastGroup;
    }
    else
    {
      syncShoutcastGroup.addStream( outStream, connection );

      // Return null to let caller know this thread was 'joined' with 
      // existing shoutcast group, and the thread should be terminated without
      // closing the HTTP socket session
      return null;  
    }
  }
  

  Thread sessionThread = null;

  /** 
   * Start the proxy session running in its own thread.
   */
  public void start()
  {
    if( sessionThread == null )
    {
      sessionThread = new Thread( this );
      sessionThread.start();  // invokes run() method
    }
    else
    {
      logger.fine("session thread already started - ignoring start req");
    }
  }
  
  public void stop()
  {
    logger.fine( "Session: stopping " );
    sessionThread = null;

    // Wait a bit
    try { Thread.sleep(1000); } catch( InterruptedException e ) { }
  }


  /**
   *  Run a single UPNP proxy session.
   */
  public void run()
  {
    logger.fine( "HTTPProxySession: running - opening queue " );

    queue.open();   // sets eof state to false, clears queue

    try
    {
      readAndQueueURL( resourceURL );
    }
    catch( MalformedURLException e )
    {
      logger.warning("Exception reading URL " + resourceURL + " " + e  );
    }

    logger.fine("Closing Queue (EOF)");
    queue.close();  // sends 'EOF' state to queue reader

    syncShoutcastGroup.stop();

    logger.fine("HTTPProxySession: Done - terminating thread");
    sessionThread = null;
  }

  /** 
   * Read data from URL (MediaServer or Internet Radio feed) and pass it 
   * to queue.
   */

  int icy_metaint;
  int nonMetadataBytesRemaining;
  long lastMetadataUpdateTimeMillis;
  String metadataString;
                                                


  public void readAndQueueURL( String urlString )
    throws MalformedURLException
  {
    logger.fine( "!!!!!Session: readAndQueueURL " + urlString + "\n\n");

    URL url = new URL( urlString );

    try 
    {
      logger.fine( "Session: Opening connection to: " +
                   url + " Host: " + url.getHost() );
      HTTPConnection conn = new HTTPConnection();
      
      HTTPRequest request = new HTTPRequest( HTTP.GET, url );
      
      //request.addHeader("Host", url.getHost() );
      // Some shoutcast servers are picky about user-agents they
      // accept connections from.  Use 'Winamp' as opposed to 'Cidero'
      // here to make things work in most casees
      //request.addHeader("User-Agent", "CideroHTTP/1.0" );
      request.addHeader("User-Agent", "Winamp/1.0" );
      request.addHeader("Accept", "*/*" );
      
      // Set any header properties. If sink supports it tell server to 
      // send the metadata 
      if( icyMetadataRequestFlag )
        request.addHeader( "Icy-Metadata", "1" );
      
      // Send the request to the server (GET Request if URL is HTTP)
      logger.fine( "Session: Connecting" );
      HTTPResponse response = conn.sendRequest( request, false );
      
      logger.fine( "Session: HTTP Response Headers from server:----" );
      logger.fine( "FirstLine: " + response.getFirstLine() );

      // List all the response headers from server
      for( int n = 0 ; n < response.getNumHeaders() ; n++ )
      {
        logger.fine( response.getHeader(n).toString() );
      }
      
      icy_metaint = 0;
      String icy_metaint_value = response.getHeaderValue("icy-metaint");
      if( icy_metaint_value != null )
        icy_metaint = Integer.parseInt(icy_metaint_value);
      if( icy_metaint == 0 )
        icy_metaint = -1;  // for benefit of downstream logic

      metadataString = null;

      logger.finer("icy-metaint after parse = " + icy_metaint );

      if( response.getStatusCode() != HTTPStatus.OK )
      {
        logger.warning("Error from HTTP-GET for url " + urlString );
        logger.warning("HTTPRequest was: " + request.toString() );
        return;
      }
      
      //
      // Read data until connection is closed by server. Read data in 
      // blocks of QUEUE_PACKET_SIZE bytes
      //
      long contentBytes = 0;
      long last = 0;
      int  bytesRead;
      byte[] buf = new byte[16384];
      ByteBuffer qBuf;
      
      BufferedInputStream inStream = 
        new BufferedInputStream( response.getInputStream() );
      
      nonMetadataBytesRemaining = icy_metaint;

      while( sessionThread != null )
      {
        //
        // Always read fixed-size chunks from source before putting in
        // queue. This makes it easier to do other things downstream (such
        // as inserting shoutcast data)
        //
        bytesRead = shoutcastStreamRead( inStream, buf, QUEUE_PACKET_SIZE );
        
        //
        // Write the data to the queue.
        // TODO: this could be done without an intermediate buf - use
        // something like gBuf.getBuf() in read above (OJN)
        //
        if( bytesRead > 0 )
        {
          int waitMillisec = 0;
          
          while( (qBuf = queue.alloc( 2000 )) == null )  // Timeout in 2 sec
          {
            if( sessionThread == null )
              break;

            logger.fine("Session: Timeout writing to queue");

            //
            // Keep track of total wait time and shut session down if
            // wait time exceeds threshold (nominally 12 sec). Don't want
            // to block Internet feed and waste resources on their end
            //
            waitMillisec += 2000;
            if( waitMillisec > RENDERER_TIMEOUT_MILLISEC )  // Nom 12 sec
            {
              logger.info("Renderer timeout - closing proxy session");
              sessionThread = null;
              break;
            }
          }

          if( sessionThread == null )
            break;

          qBuf.put( buf, 0, bytesRead );  // Copy data to queue buffer
          queue.put( qBuf );
        }

        if( bytesRead < QUEUE_PACKET_SIZE )
        {
          logger.fine("Session: bytesRead = " + bytesRead + " DONE..." );
          return;
        }
        
        contentBytes += bytesRead;

        //        logger.fine("contentBytes = " + contentBytes );

        /*
        if( (contentBytes - last) > 32768 )
        {
          logger.fine("Session: contentBytes2 = " +
                             contentBytes );
          last = contentBytes;
        }
        */
            
        //Thread.sleep( 1000*QUEUE_PACKET_SIZE/readThrottleBytesPerSec );

      } // while( sessionThread != null )

      logger.fine("Session: Total contentBytes = " +
                         contentBytes );
    }
		catch( Exception e )
    {
      logger.warning("Session: readURL: Exception" + e );
		}

    return;
  }


  public int shoutcastStreamRead( BufferedInputStream inStream,
                                  byte[] buf, int bytes )
  {
    int bytesRemaining = bytes;
    int bytesRead;

    try
    {
      while( bytesRemaining > 0 )
      {
        // Only read up to point where metadata exists
        int byteRequest = bytesRemaining;
        if( (nonMetadataBytesRemaining >= 0) && 
            (byteRequest > nonMetadataBytesRemaining) )
        {
          byteRequest = nonMetadataBytesRemaining;
        }
        
        if( byteRequest > 0 )
        {
          while( (bytesRead = inStream.read( buf, bytes-bytesRemaining,
                                             byteRequest )) > 0 )
          {
            // Adjust for next call
            byteRequest -= bytesRead;
            bytesRemaining -= bytesRead;
            if( nonMetadataBytesRemaining >= 0 )
              nonMetadataBytesRemaining -= bytesRead; 
            if( bytesRead <= 0 )
              return bytes-bytesRemaining;   // early return - error?
          }
        }
        
        if( bytesRemaining > 0 )
        {
          // Read shoutcast metadata
          int metadataBytes = inStream.read() * 16;
          logger.finest("--metadata bytes = " + metadataBytes );

          long currTime = System.currentTimeMillis();

          if( metadataBytes > 0 )
          {
            int metadataBytesRemaining = metadataBytes;

            while( metadataBytesRemaining > 0 )
            {
              bytesRead = inStream.read( metadataBuf,
                                         metadataBytes-metadataBytesRemaining,
                                         metadataBytesRemaining );
              if( bytesRead < 0 )
              {
                logger.warning("Error reading metadata block");
                return bytesRead;
              }
              
              metadataBytesRemaining -= bytesRead;
            }

            metadataString = new String( metadataBuf, 0, metadataBytes );
            logger.fine("Metadata: " + metadataString );
            
            syncShoutcastGroup.processMetadata( metadataString );
            lastMetadataUpdateTimeMillis = currTime;
          }
          else if( metadataString != null ) 
          {
            // Force update once/min if server isn't updating it
            long timeSinceLastMetadataUpdate = 
              currTime - lastMetadataUpdateTimeMillis;
            if( timeSinceLastMetadataUpdate > 60000 )
            {
              syncShoutcastGroup.processMetadata( metadataString );
              lastMetadataUpdateTimeMillis = currTime;
            }
            
          }
          

          // Done - reset counter to next metadata block
          nonMetadataBytesRemaining = icy_metaint;
        }

      } // while( bytesRemaining > 0 )
    }
    catch( Exception e )
    {
      logger.warning("Session: Exception" + e );
    }

    return bytes-bytesRemaining;
  }

}
